package com.nx.platform.es.biz.esspider.source;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Sets;
import com.google.common.collect.Table;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
import com.nx.platform.es.common.utils.*;
import com.nx.platform.es.biz.esspider.deliver.Deliverer;
import com.nx.platform.es.biz.esspider.entity.Message;
import com.nx.platform.es.biz.esspider.expression.Expression;
import com.nx.platform.es.system.config.ConfigCenter;
import com.nx.platform.es.system.utils.IPUtil;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Strings;
import org.jetbrains.annotations.NotNull;
import org.unidal.tuple.Triple;

import java.lang.reflect.Type;
import java.time.LocalDateTime;
import java.util.*;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.*;

/**
 * @author
 * @since 2017年3月24日
 */
public class MQSourceService extends AbstractIdleService implements SourceService {

    // MQ服务器地址
    private static final String NAME_SRV_ADDR = "name-srv-addr";
    // 消费者组名(各个消费者必须不同)
    private static final String CONSUMER_GROUP_NAME = "consumer-group-name";
    // 消费者线程数量(最小值)
    private static final String CONSUMER_THREAD_MIN = "consumer-thread-min";
    // 消费者线程数量(最大值)
    private static final String CONSUMER_THREAD_MAX = "consumer-thread-max";
    // 消费者消息批量接收最大值
    private static final String CONSUMER_MESSAGE_BATCH_MAX_SIZE = "consumer-message-batch-max-size";
    // 关注的主题
    private static final String TOPIC_INFOS = "topic-infos";
    // 稳定环境IP列表(使用配置的topic前缀)
    private static final String TOPIC_PREFIX_IP = "topic-prefix-ips";
    // 稳定环境TOPIC前缀(稳定环境下才生效)(非稳定环境使用ip后两域)
    private static final String TOPIC_PREFIX = "topic-prefix";
    // 关注的主题
    private static final String TOPICS = "topics";
    // 关注的标签
    private static final String TAGS = "tags";
    private static final String TAGS_DEFAULT = "*";
    // 消息体中表示数据ID的字段名称
    private static final String ID_FIELDS = "id-fields";
    // 消息对应的主题类型(info,user,...)
    private static final String CODES = "codes";
    private static final int CONSUMER_THREAD_MIN_DEFAULT = 20;
    private static final int CONSUMER_THREAD_MAX_DEFAULT = 64;
    private static final int CONSUMER_MESSAGE_BATCH_MAX_SIZE_DEFAULT = 1;

    private static final Pattern SEMICOLON = Pattern.compile(";");
    private static final Logger LOGGER = LogManager.getLogger(MQSourceService.class);
    private static final Gson GSON = new GsonBuilder().create();
    private static final Type TYPE = new TypeToken<Map<String, Object>>() {}.getType();

    private final Deliverer deliverer;
    private final boolean restartable;
    private final String configKey;

    private DefaultMQPushConsumer consumer;

    public MQSourceService(Deliverer deliverer, boolean restartable, String configKey) {
        this.deliverer = deliverer;
        this.restartable = restartable;
        this.configKey = configKey;
    }

    @Override
    protected void startUp() throws Exception {
        LOGGER.info("consumer start up");
        checkNotNull(deliverer);
        if (restartable) {
            ConfigCenter.getConfig(configKey, this::restartMQConsumer);
        } else {
            restartMQConsumer(ConfigCenter.getConfig(configKey).orElse(""));
        }
    }

    @Override
    protected void shutDown() throws Exception {
        if (consumer != null) {
            LOGGER.info("consumer shutdown: timestamp:{}", System.currentTimeMillis());
            consumer.shutdown();
        }
    }

    private Void restartMQConsumer(String mqConfig) throws Exception {

        // if started and non restartable, return directly
        if (consumer != null && !restartable) {
            return null;
        }

        // shutdown consumer if necessary
        if (consumer != null) {
            consumer.shutdown();
        }

        // parse config
        Map<?, ?> settings = YamlParser.parseToMap(mqConfig);
        checkState(MoreMaps.isNotEmpty(settings));

        String consumerGroupName = MoreMaps.getString(settings, CONSUMER_GROUP_NAME);
        checkArgument(StringUtils.isNotEmpty(consumerGroupName));

        String nameSrvAddr = MoreMaps.getString(settings, NAME_SRV_ADDR);
        checkArgument(StringUtils.isNotEmpty(nameSrvAddr));

        int batchMaxSize = MoreMaps.getIntValue(settings, CONSUMER_MESSAGE_BATCH_MAX_SIZE, CONSUMER_MESSAGE_BATCH_MAX_SIZE_DEFAULT);
        checkArgument(batchMaxSize > 0);

        int consumerThreadMin = MoreMaps.getIntValue(settings, CONSUMER_THREAD_MIN, CONSUMER_THREAD_MIN_DEFAULT);

        int consumerThreadMax = MoreMaps.getIntValue(settings, CONSUMER_THREAD_MAX, CONSUMER_THREAD_MAX_DEFAULT);
        checkArgument(consumerThreadMin > 0 && consumerThreadMax >= consumerThreadMin);

        List<String> topicInfos = MoreMaps.getStringList(settings, TOPIC_INFOS, MoreSplitters.COMMA);
        checkArgument(topicInfos != null && !topicInfos.isEmpty());

        List<String[]> topics = topicInfos.stream().map(SEMICOLON::split).filter(a -> a.length >= 4).collect(Collectors.toList());
        String topicPrefix = MoreMaps.getString(settings, TOPIC_PREFIX, "");
        String ip = IPUtil.getLocalHostIp().orElseThrow(() -> new IllegalStateException("getLocalHostIp error"));
        Set<String> ips = MoreMaps.getStringSet(settings, TOPIC_PREFIX_IP, MoreSplitters.COMMA);
        if (MoreSets.isNotEmpty(ips) && !MoreSets.contains(ips, ip)) {
            topicPrefix = IPUtil.getIpLast2Field(ip);
        }

        // init and start consumer
        // Table<topic, Pair<tags, targetId>, processors>
        Table<String, Triple<String, List<String>, Expression>, Set<String>> table = HashBasedTable.create();
        topicPrefix = topicPrefix.trim();
        consumer = new DefaultMQPushConsumer(topicPrefix + consumerGroupName);
        consumer.setNamesrvAddr(nameSrvAddr);
        for (String[] topic : topics) {
            String to = topicPrefix + topic[0].trim();
            String ta = topic[1].trim();
            String id = topic[2].trim();
            List<String> tys = MoreFunctions.toStringList(topic[3].trim(), MoreSplitters.VERTICAL);
            Set<String> tags = ta.equals(TAGS_DEFAULT) ?
                    Collections.emptySet() : Sets.newHashSet(ta.split("\\|\\|"));
            Expression expression = Expression.create("1=1");
            if (topic.length >= 5 && Strings.isNotBlank(topic[4])) {
                String condition = topic[4].trim();
                expression = Expression.create(condition);
                table.put(to, Triple.from(id, tys, expression), tags);
                LOGGER.info("topic:{}, tags:{}, id-field:{}, type:{}, condition:{}", to, ta, id, tys, condition);
            } else {
                table.put(to, Triple.from(id, tys, expression), tags);
                LOGGER.info("topic:{}, tags:{}, id-field:{}, type:{}", to, ta, id, tys);
            }
        }
        for (String topic : table.rowKeySet()) {
            String tags = table.row(topic).values().stream()
                    .reduce((a, b) -> (CollectionUtils.isEmpty(a) || CollectionUtils.isEmpty(b)) ?
                            Collections.emptySet() : new HashSet<>(CollectionUtils.union(a, b)))
                    .map(set -> Joiner.on("||").join(set))
                    .orElse(TAGS_DEFAULT);
            consumer.subscribe(topic, tags);
        }
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.setConsumeMessageBatchMaxSize(batchMaxSize);
        consumer.setConsumeThreadMin(consumerThreadMin);
        consumer.setConsumeThreadMax(consumerThreadMax);
        consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
            for (MessageExt msg : msgs) {
                List<Message> messages = parse(table.rowMap(), msg);
                messages.forEach(deliverer::deliver);
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();
        return null;

    }

    @NotNull
    private List<Message> parse(
            Map<String, Map<Triple<String, List<String>, Expression>, Set<String>>> topics,//Table<topic, Pair<tags, targetId>, processors>
            MessageExt msg) {
        String msgId = msg.getMsgId();
        String topic = msg.getTopic();
        String tags = msg.getTags();
        String header = msg.getUserProperty("MQHeader");
        String body = new String(msg.getBody());
        String line = String.format("msgId:%s, topic:%s, tags:%s, header:%s body:%s", msgId, topic, tags, header, body);
        try {
            Preconditions.checkArgument(Strings.isNotBlank(topic) && Strings.isNotBlank(body),
                    "illegal mq message");
            Map<String, Object> map = GSON.fromJson(body, TYPE);
            Map<Triple<String, List<String>, Expression>, Set<String>> subscriber = topics.get(topic);
            List<Message> messages = new ArrayList<>();
            subscriber.forEach((triple, subsTags) -> {
                if (triple.getLast().eval(map) &&
                        (CollectionUtils.isEmpty(subsTags) || TAGS_DEFAULT.equals(tags) || subsTags.contains(tags))) {
                    Long id = MoreMaps.getLong(map, triple.getFirst());
                    if (id == null || CollectionUtils.isEmpty(triple.getMiddle())) {
                        LOGGER.error("idfield parse error topic={} body={}", topic, body);
                        return;
                    }
                    LOGGER.info("desc=recive_mq topic={} target={} id={} msgId={}", topic, tags, id, msgId);
                    for (String type : triple.getMiddle()) {
                        Message message = new Message();
                        message.setTimestamp(LocalDateTime.now());
                        message.setSource(this.getClass().getSimpleName());
                        message.setType(type);
                        message.setId(id);
                        message.setDoc(null);
                        message.setLine(line);
                        messages.add(message);
                    }
                }
            });
            checkArgument(!messages.isEmpty());
            return messages;
        } catch (Exception e) {
            LOGGER.error("Message ignored, {} ", line);
            return Collections.emptyList();
        }
    }

}
